package penging.rocketmq;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.penging.infra.util.ApplicationUtil;
import com.penging.infra.util.ClassUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import penging.em.FragmentMq;

import java.util.List;

/**
 * 消费者，装载监听实现后，启动监听。
 */
@Service
public class Consumer implements DatabaseInterface{

    private Logger logger= LoggerFactory.getLogger(Consumer.class);

    private RocketMqMessageListener rocketMqMessageListener;

    public RocketMqMessageListener getRocketMqMessageListener() {
        return rocketMqMessageListener;
    }

    public void setRocketMqMessageListener(RocketMqMessageListener rocketMqMessageListener) {
        this.rocketMqMessageListener = rocketMqMessageListener;
    }

    public void init(){
        try {
            int num=0;
            String scanBasePackage = ApplicationUtil.getApplicationContext().getEnvironment().getProperty(SYSTEM_PACKAGE_URL);
            if(null == scanBasePackage){
                return;
            }
            logger.info("消费者启动，开始扫描指定目录下全部的类。。。。。");
            //把本项目里面所有@FragmentMq 注解的class找出来
            List<Class<?>> list = ClassUtil.getClasses(scanBasePackage);
            for (Class<?> clazz : list) {
                FragmentMq controller = clazz.getAnnotation(FragmentMq.class);
                if (controller != null) {
                    String s = controller.nameMq();
                    num++;
                    logger.info("第<" + num + ">个找到有注解的类=" + s);
                    startConsumer(s);
                }
            }
        } catch (Exception e) {
            logger.error("初始化消费者扫描类异常",e);
        }
    }


    /**
     * 启动消费者
     * @param groupName
     */
    public void startConsumer(String groupName){
        if(null == groupName || groupName.isEmpty()){
            logger.error("消费组名不能为空");
            return;
        }
        logger.info("启动RocketMq消费者监听...{}", groupName);
        String nameSrvAddr = ApplicationUtil.getApplicationContext().getEnvironment().getProperty(SYSTEM_NAMESRV_ADDR);
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup(groupName);
        consumer.setNamesrvAddr(nameSrvAddr);
        try {
            //订阅本组下的Tag全部的消息
            consumer.subscribe(groupName, "*");
            //程序第一次启动从消息队列头取数据
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //监听wrapper
            RocketMqMessageWrapper rocketMqMessageWrapper = new RocketMqMessageWrapper();
            if(this.rocketMqMessageListener==null){
                logger.error("请先定义消费者监听接口");
                return;
            }
            rocketMqMessageWrapper.setRocketMqMessageListener(this.rocketMqMessageListener);
            consumer.registerMessageListener(rocketMqMessageWrapper);
            consumer.start();
            logger.info("启动RocketMq消费者监听成功！",groupName);
        } catch (Exception e) {
            logger.error("启动RocketMq消费者监听异常",e);
        }

    }




}